package com.katesoft.scale4j.rttp.messaging.lock;

import org.springframework.integration.Message;
import org.springframework.integration.MessagingException;
import org.springframework.integration.core.MessageHandler;

import com.hazelcast.core.ILock;
import com.katesoft.scale4j.common.utils.AssertUtility;
import com.katesoft.scale4j.rttp.client.IClusterInstanceAware.ClusterInstanceAwareBean;

/**
 * Spring's task executor abstraction is powerful enough and allows to configure integration poller
 * inside single JVM. However it is not possible out-of-the-box use distributed scheduler and jobs
 * runner like quartz.
 * 
 * This class allows you to handle message in a 'distributed' fashion. Class will try to lock
 * message which will not allow other handles to work with the same message, and if lock is
 * successful - message handling will take place.
 * 
 * @author kate2007
 */
public class MessageUnderLockHandler extends ClusterInstanceAwareBean implements MessageHandler {
   private MessageHandler actualHandler;

   @Override
   public void handleMessage(Message<?> message) throws MessagingException {
      Object lockKey = message.getHeaders().get(AbstractInboundLockHeaderEnricher.LOCK_ID_HEADER);
      AssertUtility.assertNotNull(lockKey, "[lockId header]");

      if (getInstance().getLifecycleService().isRunning()) {
         ILock lock = getInstance().getLock(lockKey);
         if (lock.tryLock()) {
            try {
               logger.debug("lock obtained for key=%s", lockKey);
               actualHandler.handleMessage(message);
            } finally {
               lock.unlock();
            }
         } else {
            logger.debug(
                     "can't obtain lock for key=%s, another thread/jvm processing the same message! nothing to do",
                     lockKey);
         }
      }
   }

   /**
    * set original message handler.
    * 
    * @param actualHandler
    *           actual implementation
    */
   public void setActualHandler(MessageHandler actualHandler) {
      AssertUtility.assertNotNull(actualHandler, "[actual handler]");
      this.actualHandler = actualHandler;
   }
}
